/*******************************************************************************
 * Package: com.song.main
 * Type:    TestMap
 * Date:    2024-11-21 19:00
 *
 * Copyright (c) 2024 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.visit;

import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * flatMapToPair
 * @author Songxianyang
 * @date 2024-11-21 19:00
 */
public class UserVisitMap3 {

    public static void main(String[] args) {
        // 1.创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("yqmm-spark/file/user_visit_action_rdd.txt");
        // 行 过滤掉无用数据  （搜索）
        JavaRDD<String> filterRdd = lines.filter(row -> {

                    String[] s = row.split("_");
                    return "null".equals(s[5]);
                }
        );

        filterRdd.flatMapToPair(new PairFlatMapFunction<String, String, UserVisitDto>() {
            private static final long serialVersionUID = 4090489144071802731L;

            @Override
            public Iterator<Tuple2<String, UserVisitDto>> call(String row) throws Exception {
                // 不等于-1就是点击
                String[] s = row.split("_");
                if (!StrUtil.equals("-1", s[6])) {
                    // 点击 (品类ID，1，  0，      0)
                    List<Tuple2<String, UserVisitDto>> list = new ArrayList<>();
                    list.add(new Tuple2<>(s[6], new UserVisitDto(s[6], 1L, 0L, 0L)));
                    return list.iterator();
                }


                if (!StrUtil.equals("null", s[8])) {
                    // 下单 (品类ID，0，  1，      0)
                    String[] split = s[8].split(",");
                    List<Tuple2<String, UserVisitDto>> list = new ArrayList<>();
                    for (String s1 : split) {
                        list.add(new Tuple2<>(s1, new UserVisitDto(s1, 0L, 1L, 0L)));
                    }
                    return list.iterator();
                }
                // 支付 (品类ID，0，  0，      1)
                String[] split = s[10].split(",");
                List<Tuple2<String, UserVisitDto>> list = new ArrayList<>();
                for (String s1 : split) {
                    list.add(new Tuple2<>(s1, new UserVisitDto(s1, 0L, 0L, 1L)));
                }
                return list.iterator();
            }
        })
                // 该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合
                .reduceByKey((userVisitDto, userVisitDto2) -> {
                    userVisitDto.setNumberOrders(userVisitDto.getNumberOrders() + userVisitDto2.getNumberOrders());
                    userVisitDto.setClicks(userVisitDto.getClicks() + userVisitDto2.getClicks());
                    userVisitDto.setNumberPayments(userVisitDto.getNumberPayments() + userVisitDto2.getNumberPayments());
                    return userVisitDto;
                })
                // (品类，1，  0，      0)
                .map(value -> value._2())
                // 排序  需要类实现Comparable
                .sortBy(obj -> obj, true, 2)

                .take(10).forEach(System.out::println);
        sc.close();
    }
}
